Skip to main content

Optimizing Models with Quantization-Aware Training in Keras - Part II

Performing Facial Keypoints Detection with Quantization Aware Training in Keras
Created on January 19|Last edited on February 10
This article is Part-II of the 'Quantization in Keras' Series. In part I, we saw how to perform Post Quantization Training in Keras for the Facial Keypoints Detection Dataset.

Introduction

We defined "quantization" in the first part of this series, but, by way of reminder: when we talk quantization in this piece, we're specifically referring the process whereby you reduce the size of a model without significantly sacrificing accuracy. This is done frequently so these models can be deployed on edge devices like smartphones or microcontrollers.
In part one, we saw that Post Training Quantization helped us to reduce the model size by 10 times and improved the model accuracy as well. In part two, we'll look at the impact of Quantization-Aware Training on our model.
For Quantization-Aware Training, we fine-tune the model and obtain the Quantized Model.
The first part of the article remains the largely the same: we'll train a Baseline ResNet-50 Model on a dataset of images of people's faces. If you have read part one, you can directly jump to the section called Quantization-Aware Training.
Let's quantize:

Our Data and Our Goal

We’ll be using this facial keypoints dataset from Kaggle for our tutorial and experiments today. The goal of that competition, simply, was to detect keypoints on pictures of people’s faces.
Essentially, a keypoint helps a model understand and orient a person. They are, well, key points for a model, generally things like the tip of the nose, the center of the eye, or the corner of a mouth. They are not the center of the cheek, a forehead, etc. Keypoints have x and y pixel coordinates for pixels for that location. In this dataset we are given the x and y coordinates for 15 such keypoints for grayscale images so we have a total of 30 inputs for the model per picture. The input image is given in the last field of the data files, and consists of a list of pixels (ordered by row), as integers in (0,255). The images are 96x96 pixels.
Again, our goal here is pretty basic: to detect the locations of the keypoints and to use quantization to reduce the compute size of our model.

Preliminaries

We start by importing the necessary libraries. We will see how we use them as we move ahead.
# wandb
!pip install wandb

# Quantization Aware Training
!pip install tensorflow_model_optimization

import os
import time
import pathlib

import pandas as pd
import numpy as np

# Progress Bar
from tqdm.notebook import tqdm

# Data Visualization
import matplotlib.pyplot as plt
%matplotlib inline

# Sklearn
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

# Tensorflow
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.applications.resnet50 import ResNet50
import tensorflow_model_optimization as tfmot

#wandb
import wandb
from wandb.keras import WandbCallback

wandb.login()

Global Configuration

Its always a nice practice to have a separate configuration file or a class which incorporates the parameters as shown below. We have four files in the dataset:
  1. Training File - List of training 7049 images. Each row contains the (x,y) coordinates for 15 keypoints, and image data as row-ordered list of pixels.
  2. Test File - List of 1783 test images. Each row contains ImageId and image data as row-ordered list of pixels
  3. ID Lookup Table - The keypoint which is supposed to be predicted
  4. Sample Submission - The final submission file containing the predicted locations of the keypoints.
class config:
DIRECTORY_PATH = "./Facial Keypoints Detection"
TRAIN_FILE_PATH = DIRECTORY_PATH + "/input/training.csv"
TEST_FILE_PATH = DIRECTORY_PATH + "/input/test.csv"
ID_LOOKUP_TABLE_PATH = DIRECTORY_PATH + "/input/IdLookupTable.csv"
SAMPLE_FILE_PATH = DIRECTORY_PATH + "/input/SampleSubmission.csv"
SUBMISSION_FILE_PATH = DIRECTORY_PATH + "/outputs/submission.csv"

BASELINE_CHECKPOINT_PATH = DIRECTORY_PATH + "/models/resnet50_baseline/checkpoint.ckpt"
QUANTIZATION_AWARE_TRAINING_PATH = DIRECTORY_PATH + "/models/quant_aware_training/resnet50_quant_aware.tflite"

VALIDATION_DATA_SIZE = 0.2

BATCH_SIZE = 32
NUM_EPOCHS = 150
LEARNING_RATE = 0.03

Load Data

Now that we have set our configuration, we load our dataset using the load_data() funcion.
def load_data():

"""
Function to load the train and test dataset
"""

X_train = pd.read_csv(config.TRAIN_FILE_PATH)
X_test_original = pd.read_csv(config.TEST_FILE_PATH)

return X_train, X_test_original

Check Missing Values

As there are many missing values, from inspection of data we can conclude that some images have 15 keypoints whereas others have only 4.
We can take two approaches, either to fill/remove missing values and train one model on the entire dataset or use two models, one for data with 4 keypoints and other for data with 15 keypoints and then combine their predictions.
For simplicity, here we are dropping the missing value samples as we want to train only 1 model on the entire dataset and report changes observed via optimization.
def remove_missing_values():

"""
Function to drop the samples with missing values
"""

X_train, X_test_original = load_data()

X_train = X_train.dropna()
y_train = X_train.drop(['Image'], axis=1)

return X_train, y_train, X_test_original

Obtain Images

The images are represented in terms of space separated pixels in the training data, we need to convert them into the correct format.
def get_images():

"""
Function to get correct images format from the data
"""

X_train, y_train, X_test_original = remove_missing_values()

# Train Data
X_train = X_train.Image
X_train = X_train.apply(lambda x: np.fromstring(x, dtype=int, sep=' ').reshape((96,96)))

# Test Data
X_test_original = X_test_original.Image
X_test_original = X_test_original.apply(lambda x: np.fromstring(x, dtype=int, sep=' ').reshape((96,96)))

X_train /= 255.0
X_test_original /= 255.0

return X_train, y_train, X_test_original
Let us view some images with their keypoints to understand the problem better.
# View Images with their keypoints

X_train, y_train, _ = get_images()

fig = plt.figure(figsize=(9, 9))

for i in range(9):
ax = fig.add_subplot(3, 3, i + 1)
plt.imshow(X_train[i])
for i in range(1, 31, 2):
plt.plot(y_train.loc[0][i-1], y_train.loc[0][i], 'ro')

plt.show()

Training Images with their Keypoints
As you can clearly see, each image has 15 keypoints represented by the dots for different locations of their face. The location (x and y coordinates of pixel values) is what we are trying to predict.

Prepare the Data

We further process the data more to convert the data to numpy arrays, flatten and reshape them to the desired format.
def process_data():

"""
Function to convert, flatten and reshape the arrays
"""

X_train, y_train, X_test_original = get_images()

# Convert to arrays
X_train = X_train.to_numpy()
X_test_original = X_test_original.to_numpy()

# Flatten the arrays
X_train = X_train.flatten()
X_test_original = X_test_original.flatten()

X_train = np.concatenate(X_train, axis=0)
X_train = np.concatenate(X_train, axis=0)

X_test_original = np.concatenate(X_test_original, axis=0)
X_test_original = np.concatenate(X_test_original, axis=0)

# Reshape the arrays to get correct image dimensions
X_train = X_train.reshape((-1, 96, 96))
X_test_original = X_test_original.reshape((-1, 96, 96))

return X_train, y_train, X_test_original
To prepare our validation dataset, we will simply use the train_test_split function offered by scikit-learn.
# Obtain Training and Validation Data Splits

def get_data_split():

"""
Function to obtain train and validation data splits
"""

X_train, y_train, X_test_original = process_data()

X_train, X_valid, y_train, y_valid = train_test_split(
X_train,
y_train,
test_size=config.VALIDATION_DATA_SIZE,
random_state=0
)

X_valid, X_test, y_valid, y_test = train_test_split(
X_valid,
y_valid,
test_size=config.VALIDATION_DATA_SIZE,
random_state=0
)
return X_train, X_valid, X_test, y_train, y_valid, y_test
After we have prepared the training, validation and test datasets, we will make the images as 3 channels (originally they are 1 channel only) to suit our requirements which is achieved using the get_correct_dimensions() function.
def get_correct_dimensions():

"""
Function to convert the data into 3 channels
"""

X_train, X_valid, X_test, y_train, y_valid, y_test = get_data_split()

# Reshape all arrays
X_train = X_train.reshape((-1, 96, 96, 1))
X_valid = X_valid.reshape((-1, 96, 96, 1))
X_test = X_test.reshape((-1, 96, 96, 1))

# Concatenate the data with itself to obtain 3 Channels

X_train = np.concatenate((X_train, X_train, X_train), axis=-1)
X_test = np.concatenate((X_test, X_test, X_test), axis=-1)
X_valid = np.concatenate((X_valid, X_valid, X_valid), axis=-1)

return X_train, X_test, X_valid, y_train, y_valid, y_test

ResNet50 Baseline Model

Till here we have preprocessed our dataset and have obtained the datasets in the correct format. The data is now ready to be trained.
For training the model, we will implement Transfer Learning using a pretrained ResNet50 model. You can try out larger models like EfficientNets too, but for our use case a single ResNet model is enough to achieve a decent score.
Before training a model we need to define certain callbacks. In this case we define 4 callbacks -
TensorBoard to log
Early Stopping to prevent the model from overfitting
Checkpoint Callback to save the model callback
WandB Callback to log everything to our wandb dashboard
def get_callbacks(checkpoint_path, callbacks = tf.keras.callbacks):

"""
Function to declare the callbacks
"""

early_stopping = callbacks.EarlyStopping(
min_delta=0.1, # minimium amount of change to count as an improvement
patience=20, # how many epochs to wait before stopping
restore_best_weights=True,
)

# Create a callback that saves the model's weights
checkpoint_callback = keras.callbacks.ModelCheckpoint(
filepath=checkpoint_path,
save_weights_only=True,
)

# Define the callbacks
callbacks = [
keras.callbacks.TensorBoard(log_dir='./logs'),
early_stopping,
checkpoint_callback,
WandbCallback()
]

return callbacks
Then we define our ResNet model with pretrained weights. We will freeze the pre-trained layers and add custom layers at the end.
def get_custom_resnet():

"""
Function to load pretrained resnet model and freeze certain layers
"""

inputs = keras.Input(shape=(96, 96, 3))

# Define the ResNet50 model with pretrained weights (Transfer Learning)
res_model = ResNet50(
input_tensor=inputs,
weights='imagenet'
)

# We will only use certain layers, rest we shall define
for layer in res_model.layers[:143]:
layer.trainable = False

return res_model
Finally we define the entire architecture consisting of 3 simple data augmentations, followed by the ResNet model, a flatten layer and a dense layer.
# Final Architecture with Data Augmentations
def setup_model():

"""
Function to define the final model architecture with data augmentations
"""

res_model = get_custom_resnet()

model = keras.models.Sequential()
model.add(keras.layers.RandomRotation(factor = 0.5))
model.add(keras.layers.RandomFlip())
model.add(keras.layers.RandomContrast(factor = 0.5))
model.add(res_model)
model.add(keras.layers.Flatten())
model.add(keras.layers.Dense(30, activation='relu'))

return model

def setup_pretrained_model(checkpoint_path):

"""
Function to load pretrained model
"""

model = make_model()
model.load_weights(checkpoint_path)

return model
Now we define the optimizer which in our case is Adam and compile the model for training. Notice that we are using Root Mean Squared Error as the metric here.
def make_model():

"""
Function to load model, define optimizer and compile the model
"""

model = setup_model()
# Define Optimizer
optimizer = keras.optimizers.Adam(learning_rate=config.LEARNING_RATE)

# Compile the model
model.compile(
optimizer=optimizer,
loss='mean_squared_error',
metrics=tf.keras.metrics.RootMeanSquaredError()
)

return model
Thus, we now load our datasets, obtain the callbacks and set the model training function.
def run_model(checkpoint_path):

"""
Function to fit the model on the dataset and report the training time
"""

X_train, X_test, X_valid, y_train, y_valid, y_test = get_correct_dimensions()

model = make_model()

callbacks = get_callbacks(checkpoint_path = checkpoint_path)

# Train model and save history

start = time.time()

history = model.fit(
X_train,
y_train,
validation_data=(X_valid, y_valid),
batch_size=config.BATCH_SIZE,
epochs=config.NUM_EPOCHS,
callbacks=callbacks
)

end = time.time()

print(f"Total Time taken for Model Training: {end - start} seconds.")

return history

Run Model

We initialize wandb to record our run and start training our model.
run = wandb.init(project='Quantization Aware Training')

history = run_model(config.BASELINE_CHECKPOINT_PATH)

wandb.finish()

Run: Baseline ResNet-50
1

From the above charts, it is clear that with increasing number of epochs the training as well as the validation loss both decrease, thus there is no overfitting.

Model Evaluation and Size

We will create two functions namely evaluate_model() to obtain the metric (root mean squared error) on validation dataset and get_model_size() to obtain the size of the model.
def evaluate_model(checkpoint_path):
"""
Function to calculate loss and root mean squared error on validation dataset
"""

model = setup_pretrained_model(checkpoint_path)

_, X_test, _, _, _, y_test = get_correct_dimensions()

loss, rmse = model.evaluate(X_test, y_test) # returns loss and metrics
print("loss: %.2f" % loss)
print("rmse: %.2f" % rmse)

def get_model_size(path, checkpoint = True):

"""
Function to obtain the size of a model
"""
if checkpoint:
filepath = path + '.data-00000-of-00001'

else:
filepath = path
print(f"{os.path.getsize(filepath)/float(1<<20):,.0f} MB")

# Evaluate Model
evaluate_model(config.BASELINE_CHECKPOINT_PATH)

# Model Size
get_model_size(config.BASELINE_CHECKPOINT_PATH)

Baseline Model Conclusion:

These are the results obtained from the baseline model we trained.
Model Description -
  • ResNet50 with Pretrained Weights along with Data Augmentations
Result -
  • Root Mean Squared Error = 3.05
  • Training Time = 776 seconds
  • Model Size = 228 MB
We will refer back to this conclusion after we quantize the model.

Quantization-Aware Training

Quantization-Aware Training emulates inference-time quantization, creating a model that downstream tools will use to produce actually quantized models. The quantized models use lower-precision (e.g. 8-bit instead of 32-bit float), leading to benefits during deployment.

Define the Model

There are multiple ways to implement the Quantization-Aware Training method.
You can choose to Quantize the entire model i.e., all the layers or Quantize only selected layers of the model.
The question is, if Quantization reduces the model size, won't it be better to just quantize the entire model, then why do we have the option to quantize some layers only?
The reason is simple, Quantizing a model can have a negative effect on accuracy. Thus, we can selectively quantize layers of a model to explore the trade-off between accuracy, speed, and model size.
Quantization-Aware Training is a method of fine tuning. Thus, we need to create q Quantize Aware model and train it again. In case of Post Training Quantization, we did not need to retrain the model.
Step I is loading the Baseline ResNet-50 model. Here's the code:
# Load model with pretrained weights
model = setup_pretrained_model(config.BASELINE_CHECKPOINT_PATH)
In Step II, we're going to quantize only some layers. We select that we will quantize only the dense layers of the model. This needs to be specified to TensorFlow.
So we define a helper function apply_quantization_to_dense() which specifies that only the dense layers need to be quantized. Then we annotate the Dense layers using the quantize_annotate_layer() function provided by TensorFlow.
def apply_quantization_to_dense(layer):
if isinstance(layer, tf.keras.layers.Dense):
return tfmot.quantization.keras.quantize_annotate_layer(layer)
return layer
Next, in Step III before applying quantization we need to clone the model. To implement this we use tf.keras.models.clone_model to apply apply_quantization_to_dense to the layers of the model.
annotated_model = tf.keras.models.clone_model(
model,
clone_function=apply_quantization_to_dense,
)
In Step IV, we build the annotated_model i.e., specify the input size to the model. Now finally the Dense layers are annotated and quantize_apply actually makes the model quantization-aware.
# Build Model
annotated_model.build((None, 96,96,3))

# Now that the Dense layers are annotated,
# `quantize_apply` actually makes the model quantization aware.
quant_aware_model = tfmot.quantization.keras.quantize_apply(annotated_model)
In Step VI, similar to training any regular Deep Learning model, we specify the optimizer and compile the model for training.
# Define Optimizer
optimizer = keras.optimizers.Adam(learning_rate=0.03)

# `quantize_model` requires a recompile.
quant_aware_model.compile(
optimizer=optimizer,
loss='mean_squared_error',
metrics=tf.keras.metrics.RootMeanSquaredError()
)
Moving ahead to Step VII, we load the required datasets and define the Early Stopping Callback along with others. Then we train the model and use wandb to log everything.
# Load datasets
X_train, X_test, X_valid, y_train, y_valid, y_test = get_correct_dimensions()

# Define Early stopping
early_stopping = tf.keras.callbacks.EarlyStopping(
min_delta=0.1, # minimium amount of change to count as an improvement
patience=20, # how many epochs to wait before stopping
restore_best_weights=True,
)

run = wandb.init(project='Quantization Aware Training')

# Define the callbacks
callbacks = [
keras.callbacks.TensorBoard(log_dir='./logs'),
early_stopping,
WandbCallback()
]

# Train model and save history
start = time.time()

history_quant_aware = quant_aware_model.fit(
X_train,
y_train,
validation_data=(X_valid, y_valid),
batch_size=config.BATCH_SIZE,
epochs=config.NUM_EPOCHS,
callbacks=callbacks
)

wandb.finish()

end = time.time()

print(f"Total Time taken for Model Training: {end - start} seconds.")

Run: Quantization Aware Training
1

From the above charts we can conclude that the model accuracy affected and it has slightly increased as compared to the Baseline Model.
But wait, we are not done yet!
After this, you have an actually quantized model with int8 weights and uint8 activations.
# Convert to TensorFlow Lite Model with optimization
converter = tf.lite.TFLiteConverter.from_keras_model(quant_aware_model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
quantized_tflite_model = converter.convert()
Now we save the quantized model and write the file in tflite format.
# Save the Model
dir_path = config.DIRECTORY_PATH + "/models/quant_aware_training/"

tflite_models_dir = pathlib.Path(dir_path)
tflite_models_dir.mkdir(exist_ok=True, parents=True)

tflite_model_quant_aware_file = tflite_models_dir/"resnet50_quant_aware.tflite"

# Write File
tflite_model_quant_aware_file.write_bytes(quantized_tflite_model)

interpreter = tf.lite.Interpreter(model_content=quantized_tflite_model)
interpreter.allocate_tensors()
At this stage we have a Quantization-Aware Model. We will now evaluate it against the test dataset.
# A helper function to evaluate the TF Lite model using "test" dataset.

def evaluate_quant_model(interpreter):

input_index = interpreter.get_input_details()[0]["index"]
output_index = interpreter.get_output_details()[0]["index"]

# Run predictions on every image in the validation dataset.
final_predictions = []
rmse = 0

for test_image, target in tqdm(zip(X_test, y_test.values), total = len(X_test)):

# Pre-processing: add batch dimension and convert to float32 to match with
# the model's input data format.
test_image = test_image.astype(np.float32)
interpreter.set_tensor(input_index, tf.expand_dims(test_image, axis=0))

# Run inference.
interpreter.invoke()

# Post-processing:
output = interpreter.tensor(output_index)

# Obtain Predictions
predictions = (interpreter.get_tensor(output_index)).flatten()
final_predictions.append(predictions)

# Evaluate Model
rmse += (mean_squared_error(target, predictions, squared = False))/len(X_test)

return rmse

# Root Mean Squared Error
rmse = evaluate_quant_model(interpreter)

print(f"RMSE for Quantization Aware Training: {rmse}")

# Model Size
get_model_size(config.QUANTIZATION_AWARE_TRAINING_PATH, checkpoint = False)

Quantization-Aware Training Model Conclusion:

Model Description
  • ResNet50 with Pretrained Weights along with Data Augmentations
  • Quantization-Aware Training on Dense Layers
Results
  • Root Mean Squared Error = 2.86 (Considerably better than baseline model but similar to Post Training Quantization)
  • Training Time = 151 seconds (Less than baseline model)
  • Model Size = 97 MB (Considerably smaller than baseline larger than post training quantization)

TFLite Inference

In the final step, we will perform inference on the original Test Dataset provided in the competition. The steps are similar as the last section.
def tflite_inference(interpreter):
"""
Function to perform inference of a tflite model
"""

# Prepare Test Dataset
_, _, X_test_original = process_data()
X_test_original = X_test_original.reshape((-1, 96, 96, 1))
X_test_original = np.concatenate((X_test_original, X_test_original, X_test_original), axis=-1)

input_index = interpreter.get_input_details()[0]["index"]
output_index = interpreter.get_output_details()[0]["index"]

# Run predictions on every image in the validation dataset.
final_predictions = []

for test_image in tqdm(X_test_original, total = len(X_test_original)):

# Pre-processing: add batch dimension and convert to float32 to match with
# the model's input data format.
test_image = test_image.astype(np.float32)
interpreter.set_tensor(input_index, tf.expand_dims(test_image, axis=0))

# Run inference.
interpreter.invoke()

# Post-processing:
output = interpreter.tensor(output_index)

# Obtain Predictions
predictions = (interpreter.get_tensor(output_index)).flatten()
final_predictions.append(predictions)

return final_predictions

# Load the TFLite model and allocate tensors.
interpreter = tf.lite.Interpreter(model_path=config.QUANTIZATION_AWARE_TRAINING_PATH)
interpreter.allocate_tensors()

predictions = tflite_inference(interpreter)
After we have obtained the predictions, we create a submission file for the competition, which contains the Row Id along with the location of the keypoint.
def generate_submission(predictions):

"""
Function to create submission file from the model predictions
"""

# Convert list to array
predictions = np.array(predictions)

# Get data
_, _, _, y_train, _, _ = get_correct_dimensions()
cols = list(y_train.columns)

# Load Id LookUp Table
df = pd.read_csv(config.ID_LOOKUP_TABLE_PATH)

# Fill the locations
for i in range(df.shape[0]):
df.Location[i] = predictions[df.ImageId[i]-1][cols.index(df.FeatureName[i])]

# Drop Columns which are not required in submission
df.drop(['ImageId', 'FeatureName'], axis=1, inplace=True)

# Set ROw Id as index
df = df.set_index(['RowId'])
# Save submission file
df.to_csv(config.SUBMISSION_FILE_PATH)

generate_submission(predictions)
We will plot a couple of images from the test set to see how well our model performs.
def plot_test_image(df,prediction, index):
"""
Function to plot images with their predictions for test set
"""
image = plt.imshow(df[index])
l = []
for i in range(1,31,2):
l.append(plt.plot(prediction[index][i-1], prediction[index][i], 'ro'))
return image, l

fig = plt.figure(figsize=(20, 20))

for i in range(20):
ax = fig.add_subplot(5, 4, i + 1)
plot_test_image(X_test,predictions, i)

plt.show()


The Quantization-Aware Model is also giving us pretty decent results and is able to identify the different keypoints in the images.

Ideas for Improving Accuracy

Here are some more ideas which you can try out to improve your accuracy.
  • Use KFold Training instead of train_test_split
  • Experiment with different strategies to fill nan values instead of removing them
  • Use external datasets to increase size of training data
  • Use larger models such as Efficientnets
  • Use 2 models - one for images having 4 keypoints and another for images having 15 keypoints
  • Use a face detector and face recognizer to obtain bounding box that look most like a face using architectures such as YOLO as there are many images where the face is very small
  • Face Aligner
  • Experiment with more data augmentations such as reflection, zooming, shift, bluring, noise and elastic transformations.
  • Train multiple models and create their ensembles as final model.

Conclusion

We conclude that both - Post Training Quantization and Quantization-Aware Training helped us increase the model accuracy slightly.
However, with Post Training Quantization, we were able to reduce model size from 228 MB to 25 MB whereas after Quantization-Aware Training model size was 97 MB.
Thus, we can infer that for this use case, Post Training Quantization is the best performing method, in terms of time, accuracy and size.
Feel free to implement these methods on different use cases and compare the results.
The entire code is available on GitHub in the repository facial-keypoints-detection.
If you still face any difficulties reach out to me on LinkedIn or Twitter, my messages are open :)
Iterate on AI agents and models faster. Try Weights & Biases today.